package com.ihr360.job.core.lanuch.support;

import com.ihr360.commons.context.session.Ihr360Session;
import com.ihr360.commons.context.session.Ihr360SessionContextHolder;
import com.ihr360.job.core.BatchStatus;
import com.ihr360.job.core.ExitStatus;
import com.ihr360.job.core.Job;
import com.ihr360.job.core.JobParameters;
import com.ihr360.job.core.JobParametersInvalidException;
import com.ihr360.job.core.entity.JobExecution;
import com.ihr360.job.core.entity.StepExecution;
import com.ihr360.job.core.lanuch.JobLauncher;
import com.ihr360.job.core.repository.JobExecutionAlreadyRunningException;
import com.ihr360.job.core.repository.JobInstanceAlreadyCompleteException;
import com.ihr360.job.core.repository.JobRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.Assert;

/**
 * Simple implementation of the {@link JobLauncher} interface. The Spring Core
 * {@link TaskExecutor} interface is used to launch a {@link Job}. This means
 * that the type of executor set is very important. If a
 * {@link SyncTaskExecutor} is used, then the job will be processed
 * <strong>within the same thread that called the launcher.</strong> Care should
 * be taken to ensure any users of this class understand fully whether or not
 * the implementation of TaskExecutor used will start tasks synchronously or
 * asynchronously. The default setting uses a synchronous task executor.
 * <p>
 * There is only one required dependency of this Launcher, a
 * {@link JobRepository}. The JobRepository is used to obtain a valid
 * JobExecution. The Repository must be used because the provided {@link Job}
 * could be a restart of an existing {@link JobInstance}, and only the
 * Repository can reliably recreate it.
 *
 * @author Lucas Ward
 * @author Dave Syer
 * @author Will Schipp
 * @author Michael Minella
 * @see JobRepository
 * @see TaskExecutor
 * @since 1.0
 */
public class SimpleJobLauncher implements JobLauncher, InitializingBean {

    protected static final Logger logger = LoggerFactory.getLogger(SimpleJobLauncher.class);

    private JobRepository jobRepository;

    private TaskExecutor taskExecutor;

    /**
     * Run the provided job with the given {@link JobParameters}. The
     * {@link JobParameters} will be used to determine if this is an execution
     * of an existing job instance, or if a new one should be created.
     *
     * @param job           the job to be run.
     * @param jobParameters the {@link JobParameters} for this particular
     *                      execution.
     * @return JobExecutionAlreadyRunningException if the JobInstance already
     * exists and has an execution already running.
     * @throws JobRestartException                 if the execution would be a re-start, but a
     *                                             re-start is either not allowed or not needed.
     * @throws JobInstanceAlreadyCompleteException if this instance has already
     *                                             completed successfully
     * @throws JobParametersInvalidException
     */
    @Override
    public JobExecution run(final Job job, final JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
            JobParametersInvalidException {

        Assert.notNull(job, "The Job must not be null.");
        Assert.notNull(jobParameters, "The JobParameters must not be null.");

        final JobExecution jobExecution;
        JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
        if (lastExecution != null) {
            if (!job.isRestartable()) {
                throw new JobRestartException("JobInstance already exists and is not restartable");
            }
            /*
             * validate here if it has stepExecutions that are UNKNOWN
			 * retrieve the previous execution and check
			 */
            for (StepExecution execution : lastExecution.getStepExecutions()) {
                if (execution.getStatus() == BatchStatus.UNKNOWN) {
                    //throw
                    throw new JobRestartException("Step [" + execution.getStepName() + "] is of status UNKNOWN");
                }//end if
            }//end for
        }

        // Check the validity of the parameters before doing creating anything
        // in the repository...
        job.getJobParametersValidator().validate(jobParameters);

		/*
         * There is a very small probability that a non-restartable job can be
		 * restarted, but only if another process or thread manages to launch
		 * <i>and</i> fail a job execution for this instance between the last
		 * assertion and the next method returning successfully.
		 */
        jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters, job.getStepsCount());

        try {
            Ihr360Session ihr360Session = Ihr360SessionContextHolder.getSession();
            taskExecutor.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        Ihr360SessionContextHolder.setSession(ihr360Session);
                        logger.info(Thread.currentThread().getName());
                        logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
                                + "]");
                        job.execute(jobExecution);

                        logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
                                + "] and the following status: [" + jobExecution.getStatus() + "]");
                    } catch (Throwable t) {
                        logger.info("Job: [" + job
                                + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
                                + "]", t);
                        rethrow(t);
                    }
                }

                private void rethrow(Throwable t) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    } else if (t instanceof Error) {
                        throw (Error) t;
                    }
                    throw new IllegalStateException(t);
                }
            });
        } catch (TaskRejectedException e) {
            jobExecution.upgradeStatus(BatchStatus.FAILED);
            if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
                jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
            }
            jobRepository.update(jobExecution);
        }

        return jobExecution;
    }

    /**
     * Set the JobRepsitory.
     *
     * @param jobRepository
     */
    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    /**
     * Set the TaskExecutor. (Optional)
     *
     * @param taskExecutor
     */
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /**
     * Ensure the required dependencies of a {@link JobRepository} have been
     * set.
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.state(jobRepository != null, "A JobRepository has not been set.");
        if (taskExecutor == null) {
            logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
            taskExecutor = new SyncTaskExecutor();
        }
    }
}
